{
 "metadata": {
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6-final"
  },
  "orig_nbformat": 2,
  "kernelspec": {
   "name": "python3",
   "display_name": "Python 3",
   "language": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2,
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark import SparkContext\n",
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession\\\n",
    "        .builder\\\n",
    "        .master(\"local\")\\\n",
    "        .appName(\"DataProcess\")\\\n",
    "        .config(\"spark.executor.memory\",\"3g\")\\\n",
    "        .config(\"spark.executor.instances\",\"5\")\\\n",
    "        .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "#导入user_log数据\n",
    "user_log = spark.read.csv(r\"hdfs://node1:9000/user/root/exp4/data_format1/user_log_format1.csv\", encoding='utf8', header=True, inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+-------+------+---------+--------+----------+-----------+\n|user_id|item_id|cat_id|seller_id|brand_id|time_stamp|action_type|\n+-------+-------+------+---------+--------+----------+-----------+\n|      1| 112203|  1252|     4026|    1469|      1018|          0|\n|      1|1110495|   992|     1019|    6805|      1111|          0|\n|      1|1110495|   992|     1019|    6805|      1111|          0|\n|      1| 556107|  1252|      739|    6822|      1018|          0|\n|      1| 411984|  1252|     4177|    1960|      1018|          0|\n|      1| 112203|  1252|     4026|    1469|      1018|          0|\n|      1| 112203|  1252|     4026|    1469|      1021|          2|\n|      1| 112203|  1252|     4026|    1469|      1021|          0|\n|      1|1110495|   992|     1019|    6805|      1111|          0|\n|      1|1110495|   992|     1019|    6805|      1111|          0|\n+-------+-------+------+---------+--------+----------+-----------+\n\n"
     ]
    }
   ],
   "source": [
    "user_log.orderBy(\"user_id\").limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "#导入训练集\n",
    "df_train = spark.read.csv(r\"hdfs://node1:9000/user/root/exp4/data_format1/train_format1.csv\", encoding='utf8', header=True, inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+-----------+-----+\n|user_id|merchant_id|label|\n+-------+-----------+-----+\n|  34176|       3906|    0|\n|  34176|        121|    0|\n|  34176|       4356|    1|\n|  34176|       2217|    0|\n| 230784|       4818|    0|\n| 362112|       2618|    0|\n|  34944|       2051|    0|\n| 231552|       3828|    1|\n| 231552|       2124|    0|\n| 232320|       1168|    0|\n+-------+-----------+-----+\n\n"
     ]
    }
   ],
   "source": [
    "df_train.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "#导入用户信息\n",
    "user_info = spark.read.csv(r\"hdfs://node1:9000/user/root/exp4/data_format1/user_info_format1.csv\", encoding='utf8', header=True, inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+---------+------+\n|user_id|age_range|gender|\n+-------+---------+------+\n| 376517|        6|     1|\n| 234512|        5|     0|\n| 344532|        5|     0|\n| 186135|        5|     0|\n|  30230|        5|     0|\n| 272389|        6|     1|\n| 281071|        4|     0|\n| 139859|        7|     0|\n| 198411|        5|     1|\n|  67037|        4|     1|\n+-------+---------+------+\n\n"
     ]
    }
   ],
   "source": [
    "user_info.limit(10).show()"
   ]
  },
  {
   "source": [
    "想要建立的特征\n",
    "需要根据user_id，和merchant_id（seller_id）,从用户画像表以及用户日志表中提取特征，填写到df_train这个数据框中，从而训练评估模型 需要建立的特征如下：\n",
    "\n",
    "用户的年龄(age_range)  \n",
    "用户的性别(gender)  \n",
    "某用户在该商家日志的总条数(total_logs)  \n",
    "用户浏览的商品的数目，就是浏览了多少个商品(unique_item_ids)  \n",
    "浏览的商品的种类的数目，就是浏览了多少种商品(categories)  \n",
    "用户浏览的天数(browse_days)  \n",
    "用户单击的次数(one_clicks)  \n",
    "用户添加购物车的次数(shopping_carts)  \n",
    "用户购买的次数(purchase_times)  \n",
    "用户收藏的次数(favourite_times)  "
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "#age_range,gender特征添加\n",
    "df_train = df_train.join(user_info,[\"user_id\"],\"left\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+---------+-----+\n|user_id|seller_id|count|\n+-------+---------+-----+\n|      1|      739|    1|\n|      1|      925|    4|\n|      1|     1019|   14|\n|      1|     4335|    1|\n|      1|     4026|    5|\n|      1|     2245|    5|\n|      1|      471|    1|\n|      1|     1156|    1|\n|      1|     4177|    1|\n|      2|     1784|    2|\n|      2|     1544|    1|\n|      2|     1974|   20|\n|      2|     2223|    2|\n|      2|      420|   26|\n|      2|     1679|    3|\n|      2|     3716|    1|\n|      2|     4924|    1|\n|      2|     2412|    1|\n|      2|     1816|    1|\n|      2|     1179|    1|\n+-------+---------+-----+\n\n"
     ]
    }
   ],
   "source": [
    "#total_logs(某用户在该商家日志的总条数)特征添加\n",
    "total_logs_temp = user_log.groupby([\"user_id\",\"seller_id\"]).count()\n",
    "total_logs_temp.orderBy(\"user_id\").limit(20).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+-----------+----------+\n|user_id|merchant_id|total_logs|\n+-------+-----------+----------+\n| 328862|       1200|         5|\n+-------+-----------+----------+\n\n"
     ]
    }
   ],
   "source": [
    "total_logs_temp = total_logs_temp.withColumnRenamed(\"seller_id\",\"merchant_id\").withColumnRenamed(\"count\",\"total_logs\")\n",
    "total_logs_temp.limit(1).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+-----------+-----+---------+------+----------+\n|user_id|merchant_id|label|age_range|gender|total_logs|\n+-------+-----------+-----+---------+------+----------+\n|    464|       4718|    0|        6|     0|         4|\n|    867|       3152|    0|        3|     0|        17|\n|   1882|       4377|    0|        6|     1|         4|\n|   2450|       2760|    0|        0|     0|         5|\n|   2766|       3885|    0|        4|     1|         1|\n|   2829|        467|    0|        4|     0|         3|\n|   2861|       4973|    0|        6|     0|         1|\n|   3359|       4090|    0|        2|     0|         8|\n|   3487|       1861|    0|        6|     1|         4|\n|   5460|       1485|    0|        4|     0|        58|\n+-------+-----------+-----+---------+------+----------+\n\n"
     ]
    }
   ],
   "source": [
    "df_train = df_train.join(total_logs_temp,[\"user_id\",\"merchant_id\"],\"left\")\n",
    "df_train.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+---------+-------+\n|user_id|seller_id|item_id|\n+-------+---------+-------+\n| 328862|     1253| 315621|\n| 328862|     1138| 431931|\n| 328862|      465|  92347|\n| 328862|     3828| 671405|\n| 356311|     4650| 238223|\n|  92396|     4499| 680664|\n|  37908|     2883| 150707|\n|   2859|      296| 408609|\n|   2859|     1703| 220138|\n| 153790|      158| 209679|\n|  26516|     3326| 154863|\n| 366342|     1892|  44341|\n| 171799|     1488| 614056|\n| 300681|      606| 402611|\n| 300681|     1461| 603371|\n| 300681|     2594| 533712|\n| 300681|      527| 117760|\n| 300681|      606| 552140|\n| 300681|     2900| 637306|\n| 401805|     4432| 209498|\n+-------+---------+-------+\nonly showing top 20 rows\n\n"
     ]
    }
   ],
   "source": [
    "#unique_item_ids特征添加\n",
    "unique_item_ids_temp = user_log.groupby([\"user_id\",\"seller_id\",\"item_id\"]).count()\n",
    "unique_item_ids_temp = unique_item_ids_temp.selectExpr(\"user_id\",\"seller_id\",\"item_id\")\n",
    "unique_item_ids_temp.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+-----------+---------------+\n|user_id|merchant_id|unique_item_ids|\n+-------+-----------+---------------+\n| 303135|        306|             51|\n| 197844|       1550|              7|\n| 380525|        913|              1|\n|  70920|       2328|              1|\n|  96653|       2114|             19|\n|  24240|       3471|              1|\n| 416856|        133|              7|\n| 390478|        113|              2|\n| 257715|       1617|              4|\n|  29193|       1606|             14|\n+-------+-----------+---------------+\n\n"
     ]
    }
   ],
   "source": [
    "unique_item_ids_temp = unique_item_ids_temp.groupBy([\"user_id\",\"seller_id\"]).count()\n",
    "unique_item_ids_temp = unique_item_ids_temp.withColumnRenamed(\"seller_id\",\"merchant_id\").withColumnRenamed(\"count\",\"unique_item_ids\")\n",
    "unique_item_ids_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+-----------+-----+---------+------+----------+---------------+\n|user_id|merchant_id|label|age_range|gender|total_logs|unique_item_ids|\n+-------+-----------+-----+---------+------+----------+---------------+\n|    464|       4718|    0|        6|     0|         4|              2|\n|    867|       3152|    0|        3|     0|        17|              3|\n|   1882|       4377|    0|        6|     1|         4|              1|\n|   2450|       2760|    0|        0|     0|         5|              1|\n|   2766|       3885|    0|        4|     1|         1|              1|\n|   2829|        467|    0|        4|     0|         3|              2|\n|   2861|       4973|    0|        6|     0|         1|              1|\n|   3359|       4090|    0|        2|     0|         8|              3|\n|   3487|       1861|    0|        6|     1|         4|              1|\n|   5460|       1485|    0|        4|     0|        58|             29|\n+-------+-----------+-----+---------+------+----------+---------------+\n\n"
     ]
    }
   ],
   "source": [
    "df_train = df_train.join(unique_item_ids_temp,[\"user_id\",\"merchant_id\"],\"left\")\n",
    "df_train.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "#categories特征构建\n",
    "categories_temp = user_log.groupby([\"user_id\", \"seller_id\", \"cat_id\"]).count()\n",
    "#categories_temp.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "categories_temp = categories_temp.selectExpr(\"user_id\",\"seller_id\",\"cat_id\")\n",
    "#categories_temp.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "categories_temp = categories_temp.groupBy([\"user_id\",\"seller_id\"]).count()\n",
    "categories_temp = categories_temp.withColumnRenamed(\"seller_id\",\"merchant_id\").withColumnRenamed(\"count\",\"categories\")\n",
    "#categories_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+-----------+-----+---------+------+----------+---------------+----------+\n|user_id|merchant_id|label|age_range|gender|total_logs|unique_item_ids|categories|\n+-------+-----------+-----+---------+------+----------+---------------+----------+\n|    464|       4718|    0|        6|     0|         4|              2|         2|\n|    867|       3152|    0|        3|     0|        17|              3|         1|\n|   1882|       4377|    0|        6|     1|         4|              1|         1|\n|   2450|       2760|    0|        0|     0|         5|              1|         1|\n|   2766|       3885|    0|        4|     1|         1|              1|         1|\n|   2829|        467|    0|        4|     0|         3|              2|         1|\n|   2861|       4973|    0|        6|     0|         1|              1|         1|\n|   3359|       4090|    0|        2|     0|         8|              3|         2|\n|   3487|       1861|    0|        6|     1|         4|              1|         1|\n|   5460|       1485|    0|        4|     0|        58|             29|         4|\n+-------+-----------+-----+---------+------+----------+---------------+----------+\n\n"
     ]
    }
   ],
   "source": [
    "df_train = df_train.join(categories_temp,[\"user_id\",\"merchant_id\"],\"left\")\n",
    "df_train.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "#browse_days特征构建\n",
    "browse_days_temp = user_log.groupby([\"user_id\",\"seller_id\",\"time_stamp\"]).count()\n",
    "#browse_days_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "browse_days_temp = browse_days_temp.selectExpr(\"user_id\",\"seller_id\",\"time_stamp\")\n",
    "browse_days_temp = browse_days_temp.groupby([\"user_id\",\"seller_id\"]).count()\n",
    "browse_days_temp = browse_days_temp.withColumnRenamed(\"seller_id\",\"merchant_id\").withColumnRenamed(\"count\",\"browse_days\")\n",
    "#browse_days_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+\n|user_id|merchant_id|label|age_range|gender|total_logs|unique_item_ids|categories|browse_days|\n+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+\n|    464|       4718|    0|        6|     0|         4|              2|         2|          1|\n|    867|       3152|    0|        3|     0|        17|              3|         1|          4|\n|   1882|       4377|    0|        6|     1|         4|              1|         1|          1|\n|   2450|       2760|    0|        0|     0|         5|              1|         1|          1|\n|   2766|       3885|    0|        4|     1|         1|              1|         1|          1|\n|   2829|        467|    0|        4|     0|         3|              2|         1|          1|\n|   2861|       4973|    0|        6|     0|         1|              1|         1|          1|\n|   3359|       4090|    0|        2|     0|         8|              3|         2|          3|\n|   3487|       1861|    0|        6|     1|         4|              1|         1|          2|\n|   5460|       1485|    0|        4|     0|        58|             29|         4|          6|\n+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+\n\n"
     ]
    }
   ],
   "source": [
    "df_train = df_train.join(browse_days_temp,[\"user_id\",\"merchant_id\"],\"left\")\n",
    "df_train.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "#先将处理好的暂时写到文件中\n",
    "df_train.write.options(header=\"true\").csv(\"hdfs://node1:9000/user/root/exp4/procd_train_temp.csv\", mode = 'overwrite')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [],
   "source": [
    "#为了避免jvm崩掉，只能另起一个session\n",
    "spark.stop()\n",
    "spark = SparkSession\\\n",
    "        .builder\\\n",
    "        .master(\"local\")\\\n",
    "        .appName(\"DataProcess2\")\\\n",
    "        .config(\"spark.executor.memory\",\"3g\")\\\n",
    "        .config(\"spark.executor.instances\",\"5\")\\\n",
    "        .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "#导入user_log数据\n",
    "user_log = spark.read.csv(r\"hdfs://node1:9000/user/root/exp4/data_format1/user_log_format1.csv\", encoding='utf8', header=True, inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [],
   "source": [
    "#one_clicks、shopping_carts、purchase_times、favourite_times特征构建\n",
    "one_clicks_temp = user_log.groupby([\"user_id\",\"seller_id\",\"action_type\"]).count()\n",
    "one_clicks_temp = one_clicks_temp.withColumnRenamed(\"seller_id\",\"merchant_id\").withColumnRenamed(\"count\",\"times\")\n",
    "#one_clicks_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import functions\n",
    "from pyspark.sql.types import *\n",
    "def click_time(action,times):\n",
    "    if action == 0:\n",
    "        return 0\n",
    "    else:\n",
    "        return times\n",
    "udf_click_time = functions.udf(click_time,IntegerType())\n",
    "one_clicks_temp = one_clicks_temp.withColumn(\"one_clicks\",udf_click_time(\"action_type\",\"times\"))\n",
    "#one_clicks_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [],
   "source": [
    "def shopping_click(action,times):\n",
    "    if action == 1:\n",
    "        return times\n",
    "    else:\n",
    "        return 0\n",
    "udf_click_time = functions.udf(shopping_click,IntegerType())\n",
    "one_clicks_temp = one_clicks_temp.withColumn(\"shopping_carts\",udf_click_time(\"action_type\",\"times\"))\n",
    "#one_clicks_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [],
   "source": [
    "def purchase_click(action,times):\n",
    "    if action == 2:\n",
    "        return times\n",
    "    else:\n",
    "        return 0\n",
    "udf_click_time = functions.udf(purchase_click,IntegerType())\n",
    "one_clicks_temp = one_clicks_temp.withColumn(\"purchase_times\",udf_click_time(\"action_type\",\"times\"))\n",
    "#one_clicks_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [],
   "source": [
    "def favor_click(action,times):\n",
    "    if action == 3:\n",
    "        return times\n",
    "    else:\n",
    "        return 0\n",
    "udf_click_time = functions.udf(favor_click,IntegerType())\n",
    "one_clicks_temp = one_clicks_temp.withColumn(\"favor_times\",udf_click_time(\"action_type\",\"times\"))\n",
    "#one_clicks_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [],
   "source": [
    "four_features = one_clicks_temp.groupby([\"user_id\",\"merchant_id\"]).sum()\n",
    "#four_features.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+-----------+----------+--------------+--------------+-----------+\n|user_id|merchant_id|one_clicks|shopping_carts|purchase_times|favor_times|\n+-------+-----------+----------+--------------+--------------+-----------+\n|  26535|       3322|         0|             0|             0|          0|\n| 324592|       1704|         0|             0|             0|          0|\n| 109128|       3639|         0|             0|             0|          0|\n| 135474|        476|         0|             0|             0|          0|\n| 265449|       1636|         1|             0|             1|          0|\n| 360493|        167|         1|             0|             1|          0|\n| 233113|        941|         0|             0|             0|          0|\n| 211495|       3450|         0|             0|             0|          0|\n| 230439|       3682|         0|             0|             0|          0|\n| 332399|        158|         0|             0|             0|          0|\n+-------+-----------+----------+--------------+--------------+-----------+\n\n"
     ]
    }
   ],
   "source": [
    "four_features = four_features.selectExpr(\"user_id\",\"merchant_id\",\"`sum(one_clicks)` as one_clicks\",\n",
    "\"`sum(shopping_carts)` as shopping_carts\",\"`sum(purchase_times)` as purchase_times\",\"`sum(favor_times)` as favor_times\")\n",
    "four_features.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [],
   "source": [
    "#导入train_data数据\n",
    "df_train = spark.read.csv(r\"hdfs://node1:9000/user/root/exp4/procd_train_temp.csv\", encoding='utf8', header=True, inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+----------+--------------+--------------+-----------+\n|user_id|merchant_id|label|age_range|gender|total_logs|unique_item_ids|categories|browse_days|one_clicks|shopping_carts|purchase_times|favor_times|\n+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+----------+--------------+--------------+-----------+\n|    464|       4718|    0|        6|     0|         4|              2|         2|          1|         1|             0|             1|          0|\n|    867|       3152|    0|        3|     0|        17|              3|         1|          4|         1|             0|             1|          0|\n|   1882|       4377|    0|        6|     1|         4|              1|         1|          1|         1|             0|             1|          0|\n|   2450|       2760|    0|        0|     0|         5|              1|         1|          1|         2|             0|             2|          0|\n|   2766|       3885|    0|        4|     1|         1|              1|         1|          1|         1|             0|             1|          0|\n|   2829|        467|    0|        4|     0|         3|              2|         1|          1|         1|             0|             1|          0|\n|   2861|       4973|    0|        6|     0|         1|              1|         1|          1|         1|             0|             1|          0|\n|   3359|       4090|    0|        2|     0|         8|              3|         2|          3|         1|             0|             1|          0|\n|   3487|       1861|    0|        6|     1|         4|              1|         1|          2|         1|             0|             1|          0|\n|   5460|       1485|    0|        4|     0|        58|             29|         4|          6|         3|             0|             3|          0|\n+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+----------+--------------+--------------+-----------+\n\n"
     ]
    }
   ],
   "source": [
    "df_train = df_train.join(four_features,[\"user_id\",\"merchant_id\"],\"left\")\n",
    "df_train.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_train.write.options(header=\"true\").csv(\"hdfs://node1:9000/user/root/exp4/procd_train_real.csv\")\n",
    "df_train.write.parquet(\"hdfs://node1:9000/user/root/exp4/procd_train_real.parquet\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "output_type": "stream",
     "name": "stdout",
     "text": [
      "+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+----------+--------------+--------------+-----------+\n|user_id|merchant_id|label|age_range|gender|total_logs|unique_item_ids|categories|browse_days|one_clicks|shopping_carts|purchase_times|favor_times|\n+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+----------+--------------+--------------+-----------+\n|    464|       4718|    0|        6|     0|         4|              2|         2|          1|         1|             0|             1|          0|\n|    867|       3152|    0|        3|     0|        17|              3|         1|          4|         1|             0|             1|          0|\n|   1882|       4377|    0|        6|     1|         4|              1|         1|          1|         1|             0|             1|          0|\n|   2450|       2760|    0|        0|     0|         5|              1|         1|          1|         2|             0|             2|          0|\n|   2766|       3885|    0|        4|     1|         1|              1|         1|          1|         1|             0|             1|          0|\n|   2829|        467|    0|        4|     0|         3|              2|         1|          1|         1|             0|             1|          0|\n|   2861|       4973|    0|        6|     0|         1|              1|         1|          1|         1|             0|             1|          0|\n|   3359|       4090|    0|        2|     0|         8|              3|         2|          3|         1|             0|             1|          0|\n|   3487|       1861|    0|        6|     1|         4|              1|         1|          2|         1|             0|             1|          0|\n|   5460|       1485|    0|        4|     0|        58|             29|         4|          6|         3|             0|             3|          0|\n|   5679|       3484|    0|        4|     1|        28|              9|         3|          4|         3|             0|             1|          2|\n|   5747|       3502|    0|        2|     1|         4|              1|         1|          2|         1|             0|             1|          0|\n|   5779|       1346|    0|        3|     0|        19|              9|         1|          3|         6|             0|             1|          5|\n|   5804|       1359|    0|        4|     0|         7|              4|         2|          1|         2|             0|             2|          0|\n|   6481|       3473|    0|        3|     0|         6|              2|         1|          1|         1|             0|             1|          0|\n|   7100|       1193|    0|        5|     0|         1|              1|         1|          1|         1|             0|             1|          0|\n|   7229|        760|    0|        6|     0|         2|              1|         1|          1|         1|             0|             1|          0|\n|   7258|        145|    0|        0|     0|         9|              1|         1|          2|         2|             1|             1|          0|\n|   8182|        806|    1|        2|     0|        26|             10|         3|          3|         1|             0|             1|          0|\n|   8245|       4044|    0|        0|     1|         9|              3|         2|          1|         1|             0|             1|          0|\n+-------+-----------+-----+---------+------+----------+---------------+----------+-----------+----------+--------------+--------------+-----------+\nonly showing top 20 rows\n\n"
     ]
    }
   ],
   "source": [
    "#填充缺失值\n",
    "#第一种策略是将后8个特征所有null值填充为0\n",
    "df_train_filled = df_train.fillna(0)\n",
    "df_train_filled.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [],
   "source": [
    "#将数据转为合适的格式\n",
    "from pyspark.mllib.regression import LabeledPoint\n",
    "from pyspark.mllib.linalg import Vectors\n",
    "#先转成RDD\n",
    "df_train_rdd = df_train_filled.rdd\n",
    "#改成(label,features)的格式\n",
    "df_train_rdd = df_train_rdd.map(lambda line: LabeledPoint(line[2],Vectors.dense(line[3:])))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [],
   "source": [
    "#保存为LibSVMFile格式，方便后面训练使用\n",
    "from pyspark.mllib.util import MLUtils\n",
    "MLUtils.saveAsLibSVMFile(df_train_rdd, \"hdfs://node1:9000/user/root/exp4/procd_train_real\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [],
   "source": [
    "#别忘了关掉session\n",
    "spark.stop()"
   ]
  }
 ]
}